自己也在nodeseek搜了搜,发现不少人都在使用,但没找到自己需要的简单版本,于是自己弄了个。
python3 请求rss地址并解析后进行关键词检查,有提到关键词则推送,没有关键词不处理。
我设置的定时是5分钟一次,不至于错过新文章,也不会频繁访问nodeseek获取rss导致IP异常。
且每次推送的地址记录到sqlite数据库,不至于一篇文章推送多次的情况。
代码:
#!/usr/bin/env python3
import feedparser
import requests
import json
import sqlite3
from datetime import datetime
from pathlib import Path
# 配置参数
RSS_URL = "https://www.nodeseek.com/rss.xml"
KEYWORDS = ['12.6','抽奖','免费','free','抽','bagevm','原生','原生IP','联通快乐','奖']
# 飞书配置
FEISHU_WEBHOOK = 'https://open.feishu.cn/open-apis/bot/v2/hook/你的token'
# Telegram 配置
TELEGRAM_BOT_TOKEN = '机器人token'
TELEGRAM_CHAT_ID = '你的TG-id' # 例如 '123456789'
# 数据库文件路径
DB_PATH = Path(__file__).parent / "processed_entries.db"
def init_db():
"""初始化数据库,创建表(如果不存在)"""
with sqlite3.connect(DB_PATH) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS processed_entries (
link TEXT PRIMARY KEY,
processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
def is_processed(link):
"""检查文章是否已处理过"""
with sqlite3.connect(DB_PATH) as conn:
cursor = conn.cursor()
cursor.execute("SELECT 1 FROM processed_entries WHERE link = ?", (link,))
return cursor.fetchone() is not None
def mark_as_processed(link):
"""标记文章为已处理"""
with sqlite3.connect(DB_PATH) as conn:
conn.execute("INSERT OR IGNORE INTO processed_entries (link) VALUES (?)", (link,))
def check_feeds():
try:
# 1. 获取 RSS 内容(添加编码处理)
response = requests.get(RSS_URL)
response.encoding = 'utf-8' # 强制使用UTF-8解码
if response.status_code != 200:
print(f"RSS请求失败: {response.status_code}")
return
# 2. 解析 XML(添加字符编码声明)
feed = feedparser.parse(response.content) # 使用response.content代替text
if feed.bozo:
print(f"RSS解析错误: {feed.bozo_exception}")
return
# 3. 处理每篇文章
for entry in feed.entries:
if is_processed(entry.link):
continue # 跳过已处理文章
content = (entry.title + ' ' + getattr(entry, 'summary', '')).lower()
matched_keywords = [kw for kw in KEYWORDS if kw.lower() in content]
if matched_keywords:
message = format_message(entry.title, entry.link, matched_keywords)
send_to_feishu(message) # 飞书推送
send_to_telegram(message) # Telegram 推送
mark_as_processed(entry.link) # 标记为已处理
except Exception as e:
print(f"处理RSS源时出错: {str(e)}")
def format_message(title, link, keywords):
"""格式化推送消息"""
return f"发现关键词: 【{', '.join(keywords)}】\n\n标题: {title}\n链接: {link}"
def send_to_feishu(message):
"""飞书推送"""
payload = {
"msg_type": "text",
"content": {"text": message}
}
try:
response = requests.post(
FEISHU_WEBHOOK,
headers={'Content-Type': 'application/json'},
data=json.dumps(payload),
timeout=10
)
print("飞书已推送")
except Exception as e:
print(f"飞书推送异常: {str(e)}")
def send_to_telegram(message):
"""Telegram 推送"""
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
payload = {
"chat_id": TELEGRAM_CHAT_ID,
"text": message,
"disable_web_page_preview": False
}
try:
response = requests.post(
url,
headers={'Content-Type': 'application/json'},
data=json.dumps(payload),
timeout=10
)
print("Telegram已推送" if response.status_code == 200 else f"Telegram推送失败: {response.text}")
except Exception as e:
print(f"Telegram推送异常: {str(e)}")
if __name__ == '__main__':
init_db() # 确保数据库已初始化
check_feeds()
执行效果:
想法:
突然想到可以给TG机器人弄个后端,然后使用命令来进行交互,比如增删关键词,比如更深入的一些访问文章屏蔽某些用户的发帖。
就是不知道飞书机器人支不支持。
注意:
执行python3的时候会需要导入库,具体方法自行ai即可。
正文结束